<html>

<!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
   this work for additional information regarding copyright ownership.
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
-->

<body>

<p>A software framework for easily writing applications which process vast 
amounts of data (multi-terabyte data-sets) parallelly on large clusters 
(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant 
manner.</p>

<p>A Map-Reduce <i>job</i> usually splits the input data-set into independent 
chunks which processed by <i>map</i> tasks in completely parallel manner, 
followed by <i>reduce</i> tasks which aggregating their output. Typically both 
the input and the output of the job are stored in a 
{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring 
tasks and re-executing failed ones. Since, usually, the compute nodes and the 
storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed 
FileSystem are running on the same set of nodes, tasks are effectively scheduled 
on the nodes where data is already present, resulting in very high aggregate 
bandwidth across the cluster.</p>

<p>The Map-Reduce framework operates exclusively on <tt>&lt;key, value&gt;</tt> 
pairs i.e. the input to the job is viewed as a set of <tt>&lt;key, value&gt;</tt>
pairs and the output as another, possibly different, set of 
<tt>&lt;key, value&gt;</tt> pairs. The <tt>key</tt>s and <tt>value</tt>s have to 
be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the
<tt>key</tt>s have to be {@link org.apache.hadoop.io.WritableComparable}s in 
order to facilitate grouping by the framework.</p>

<p>Data flow:</p>
<pre>
                                (input)
                                <tt>&lt;k1, v1&gt;</tt>
       
                                   |
                                   V
       
                                  <b>map</b>
       
                                   |
                                   V

                                <tt>&lt;k2, v2&gt;</tt>
       
                                   |
                                   V
       
                                <b>combine</b>
       
                                   |
                                   V
       
                                <tt>&lt;k2, v2&gt;</tt>
       
                                   |
                                   V
       
                                 <b>reduce</b>
       
                                   |
                                   V
       
                                <tt>&lt;k3, v3&gt;</tt>
                                (output)
</pre>

<p>Applications typically implement 
{@link org.apache.hadoop.mapred.Mapper#map(Object, Object, OutputCollector, Reporter)} 
and
{@link org.apache.hadoop.mapred.Reducer#reduce(Object, Iterator, OutputCollector, Reporter)} 
methods.  The application-writer also specifies various facets of the job such
as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt> 
&amp; <tt>OutputFormat</tt> implementations to be used etc. as 
a {@link org.apache.hadoop.mapred.JobConf}. The client program, 
{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework 
and optionally monitors it.</p>

<p>The framework spawns one map task per 
{@link org.apache.hadoop.mapred.InputSplit} generated by the 
{@link org.apache.hadoop.mapred.InputFormat} of the job and calls 
{@link org.apache.hadoop.mapred.Mapper#map(Object, Object, OutputCollector, Reporter)} 
with each &lt;key, value&gt; pair read by the 
{@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for 
the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
and optionally aggregated by <i>combiner</i>. The key space of intermediate 
outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where 
the number of partitions is exactly the number of reduce tasks for the job.</p>

<p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http, 
merge the &lt;key, value&gt; pairs and call 
{@link org.apache.hadoop.mapred.Reducer#reduce(Object, Iterator, OutputCollector, Reporter)} 
for each &lt;key, list of values&gt; pair. The output of the reduce tasks' is 
stored on the <tt>FileSystem</tt> by the 
{@link org.apache.hadoop.mapred.RecordWriter} provided by the
{@link org.apache.hadoop.mapred.OutputFormat} of the job.</p>

<p>Map-Reduce application to perform a distributed <i>grep</i>:</p>
<pre><tt>
public class Grep extends Configured implements Tool {

  // <i>map: Search for the pattern specified by 'grep.mapper.regex' &amp;</i>
  //      <i>'grep.mapper.regex.group'</i>

  class GrepMapper&lt;K, Text&gt; 
  extends MapReduceBase  implements Mapper&lt;K, Text, Text, LongWritable&gt; {

    private Pattern pattern;
    private int group;

    public void configure(JobConf job) {
      pattern = Pattern.compile(job.get("grep.mapper.regex"));
      group = job.getInt("grep.mapper.regex.group", 0);
    }

    public void map(K key, Text value,
                    OutputCollector&lt;Text, LongWritable&gt; output,
                    Reporter reporter)
    throws IOException {
      String text = value.toString();
      Matcher matcher = pattern.matcher(text);
      while (matcher.find()) {
        output.collect(new Text(matcher.group(group)), new LongWritable(1));
      }
    }
  }

  // <i>reduce: Count the number of occurrences of the pattern</i>

  class GrepReducer&lt;K&gt; extends MapReduceBase
  implements Reducer&lt;K, LongWritable, K, LongWritable&gt; {

    public void reduce(K key, Iterator&lt;LongWritable&gt; values,
                       OutputCollector&lt;K, LongWritable&gt; output,
                       Reporter reporter)
    throws IOException {

      // sum all values for this key
      long sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }

      // output sum
      output.collect(key, new LongWritable(sum));
    }
  }
  
  public int run(String[] args) throws Exception {
    if (args.length &lt; 3) {
      System.out.println("Grep &lt;inDir&gt; &lt;outDir&gt; &lt;regex&gt; [&lt;group&gt;]");
      ToolRunner.printGenericCommandUsage(System.out);
      return -1;
    }

    JobConf grepJob = new JobConf(getConf(), Grep.class);
    
    grepJob.setJobName("grep");

    FileInputFormat.setInputPaths(grepJob, new Path(args[0]));
    FileOutputFormat.setOutputPath(grepJob, args[1]);

    grepJob.setMapperClass(GrepMapper.class);
    grepJob.setCombinerClass(GrepReducer.class);
    grepJob.setReducerClass(GrepReducer.class);

    grepJob.set("mapred.mapper.regex", args[2]);
    if (args.length == 4)
      grepJob.set("mapred.mapper.regex.group", args[3]);

    grepJob.setOutputFormat(SequenceFileOutputFormat.class);
    grepJob.setOutputKeyClass(Text.class);
    grepJob.setOutputValueClass(LongWritable.class);

    JobClient.runJob(grepJob);

    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new Grep(), args);
    System.exit(res);
  }

}
</tt></pre>

<p>Notice how the data-flow of the above grep job is very similar to doing the
same via the unix pipeline:</p>

<pre>
cat input/*   |   grep   |   sort    |   uniq -c   &gt;   out
</pre>

<pre>
      input   |    map   |  shuffle  |   reduce    &gt;   out
</pre>

<p>Hadoop Map-Reduce applications need not be written in 
Java<small><sup>TM</sup></small> only. 
<a href="../streaming/package-summary.html">Hadoop Streaming</a> is a utility
which allows users to create and run jobs with any executables (e.g. shell 
utilities) as the mapper and/or the reducer. 
<a href="pipes/package-summary.html">Hadoop Pipes</a> is a 
<a href="http://www.swig.org/">SWIG</a>-compatible <em>C++ API</em> to implement
Map-Reduce applications (non JNI<small><sup>TM</sup></small> based).</p>

<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's original 
Map/Reduce paper</a> for background information.</p>

<p><i>Java and JNI are trademarks or registered trademarks of 
Sun Microsystems, Inc. in the United States and other countries.</i></p>

</body>
</html>
